package com.smile.cloud.order.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * 延时消费者
 *
 * @author LGC
 */
@Slf4j
@Component
public class DelayConsumer {
    public final static String DELAY_CONSUMER_GROUP = "delay_consumer_group";
    public final static String TOPIC_ORDER_DELAY = "order_delay";

    @Resource
    private DelayOrderListener delayOrderListener;

    @PostConstruct
    public void init() {
        log.info(DELAY_CONSUMER_GROUP + " consumer 正在创建---------------------------------------");
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(DELAY_CONSUMER_GROUP);
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeThreadMin(5);
        consumer.setConsumeThreadMax(32);
        consumer.setConsumeMessageBatchMaxSize(3);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setMessageListener(delayOrderListener);
        try {
            consumer.subscribe(TOPIC_ORDER_DELAY, "*");
            consumer.start();
            log.info(DELAY_CONSUMER_GROUP + " consumer server 开启成功----------------------------------");
        } catch (MQClientException e) {
            log.info(DELAY_CONSUMER_GROUP + " consumer server 开启异常：{0}----------------------------------", e);
        }
    }

}
